/**
 *  Copyright 2013 University Pierre & Marie Curie - UMR CNRS 7606 (LIP6/MoVe)
 *  All rights reserved.   This program and the accompanying materials
 *  are made available under the terms of the Eclipse Public License v1.0
 *  which accompanies this distribution, and is available at
 *  http://www.eclipse.org/legal/epl-v10.html
 *
 *  Initial contributor:
 *    Lom M. Hillah - <lom-messan.hillah@lip6.fr>
 *
 *  Mailing list:
 *    lom-messan.hillah@lip6.fr
 */
package fr.lip6.msr4j.asf.utils.db;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;

import cern.colt.Arrays;

import com.google.common.collect.Sets;

import fr.lip6.msr4j.asf.datamodel.ASFCommitter;
import fr.lip6.msr4j.asf.datamodel.ASFProjectsCommitters;
import fr.lip6.msr4j.asf.datamodel.ASFRelTypes;
import fr.lip6.msr4j.asf.datamodel.SVNProject;
import fr.lip6.msr4j.asf.utils.config.ASFProperties;
import fr.lip6.msr4j.datamodel.nodes.NodeLabel;
import fr.lip6.msr4j.utils.config.ConcurrencyConfig;
import fr.lip6.msr4j.utils.db.Neo4JDBHandler;

/**
 * Handles the population of ASF projects and committers in the DB.
 * 
 * @author lom
 * 
 */
@SuppressWarnings("unchecked")
public class ConcurrentASFCommittersDBHandler extends Neo4JDBHandler {

	private final Map<SVNProject, Node> projectNodes;
	private final ConcurrentMap<ASFCommitter, Node> committerNodes;
	private final ExecutorService executorPool;

	public ConcurrentASFCommittersDBHandler(String dbPath) {
		super(dbPath);
		projectNodes = new HashMap<SVNProject, Node>();
		committerNodes = new ConcurrentHashMap<ASFCommitter, Node>();
		executorPool = Executors
				.newFixedThreadPool(ConcurrencyConfig.POOL_SIZE);
	}

	@Override
	public void populateDB(ASFProjectsCommitters projects) {
		String[] propToIndex = { ASFProperties.SVN_ID, ASFProperties.NAME,
				ASFProperties.IS_APACHE_MEMBER, ASFProperties.WEIGHT };
		startAutoIndexingNodeProperties(propToIndex);
		startAutoIndexingRelProperty(ASFProperties.WEIGHT);
		final long start = System.nanoTime();
		Transaction tx = beginTransaction();
		try {
			logger.info("Beginning insertions...");
			// Updating operations go here
			insertProjects(projects);
			insertCommitters(projects);
			linkCommitters();

			tx.success();
			logger.info("Successfully inserted all projects, committers and their relationships");
		} finally {
			tx.finish();
			executorPool.shutdown();
			shutDownDb();
		}
		final long end = System.nanoTime();
		logger.info("End of transaction. All resources shut down."
				+ "Time taken: " + (end - start) / NANO);
	}

	private void linkCommitters() {
		final List<Callable<Boolean>> partitions = new ArrayList<Callable<Boolean>>();
		for (final ASFCommitter c : committerNodes.keySet()) {
			partitions.add(new CommitterLinker(c));
		}
		try {
			final List<Future<Boolean>> values = executorPool.invokeAll(
					partitions, ALLOWED_EXEC_TIME1000, TimeUnit.SECONDS);
			logger.info("Finished linking committers. How many processed: {}.",
					values.size());
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
			logger.error(Arrays.toString(e.getStackTrace()));
		}

	}

	private void insertCommitters(final ASFProjectsCommitters projects) {
		final Set<ASFCommitter> cms = projects.getCommitters();

		final List<Callable<Boolean>> partitions = new ArrayList<Callable<Boolean>>();
		for (final ASFCommitter c : cms) {
			partitions.add(new CommitterNode(c));
		}
		try {
			final List<Future<Boolean>> values = executorPool.invokeAll(
					partitions, ALLOWED_EXEC_TIME1000, TimeUnit.SECONDS);
			logger.info(
					"Finished inserting committers. How many processed: {}.",
					values.size());
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
			logger.error(Arrays.toString(e.getStackTrace()));
		}

	}

	private void insertProjects(final ASFProjectsCommitters projects) {
		Set<SVNProject> prs = projects.getSVNProjects();
		for (final SVNProject p : prs) {
			Node aPr = createNode(NodeLabel.PROJECT);
			aPr.setProperty(ASFProperties.NAME, p.getName());
			aPr.setProperty(ASFProperties.WEIGHT, p.degree());
			projectNodes.put(p, aPr);
		}
	}

	private class CommitterNode implements Callable<Boolean> {

		private ASFCommitter comm;

		public CommitterNode(ASFCommitter c) {
			this.comm = c;
		}

		@Override
		public Boolean call() throws Exception {
			Node aComm = createNode(NodeLabel.COMMITTER);
			aComm.setProperty(ASFProperties.SVN_ID, comm.getSvnId());
			aComm.setProperty(ASFProperties.NAME, comm.getName());
			aComm.setProperty(ASFProperties.IS_APACHE_MEMBER, true);
			aComm.setProperty(ASFProperties.WEIGHT, comm.weight());
			committerNodes.put(comm, aComm);
			// Link committer to all projects he is involved in
			Set<SVNProject> pr = comm.getProjects();
			for (SVNProject p : pr) {
				Node prNode = projectNodes.get(p);
				Relationship r1 = aComm.createRelationshipTo(prNode,
						ASFRelTypes.PARTICIPATES);
				Relationship r2 = prNode.createRelationshipTo(aComm,
						ASFRelTypes.COMMITTER);
				double weight;
				// Relationship weight = half-weight if Apache Member, 1
				// otherwise
				if (comm.isApacheMember()) {
					weight = comm.weight() * HALF;
				} else {
					weight = 1;
				}
				r1.setProperty(ASFProperties.WEIGHT, weight);
				r2.setProperty(ASFProperties.WEIGHT, weight);
			}
			return Boolean.TRUE;
		}
	}

	private class CommitterLinker implements Callable<Boolean> {
		private ASFCommitter comm;

		public CommitterLinker(ASFCommitter c) {
			this.comm = c;
		}

		@Override
		public Boolean call() throws Exception {
			// Link this committer to his colleagues (common projects)
			Set<ASFCommitter> colleagues = comm.getColleagues();
			for (ASFCommitter c : colleagues) {
				Set<SVNProject> theirs = c.getProjects();
				Set<SVNProject> mine = comm.getProjects();
				Set<SVNProject> common;
				if (theirs.size() < mine.size()) {
					common = Sets.intersection(theirs, mine);
				} else {
					common = Sets.intersection(mine, theirs);
				}
				Node colNode = committerNodes.get(c);
				Node miNode = committerNodes.get(comm);
				Relationship r = miNode.createRelationshipTo(colNode,
						ASFRelTypes.WORKSWITH);
				// Relationship weigth = number of common projects
				r.setProperty(ASFProperties.WEIGHT, common.size());
			}
			return Boolean.TRUE;
		}
	}
}
